﻿using Actor.Net.Network.Gate.Adapter;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Actor.Net.Network.Gate.Tcp
{
    public class GateTcpChannel : GateBaseChannel
    {
        private SocketAsyncEventArgs inputArgs { get; set; }
        private SocketAsyncEventArgs outputArgs { get; set; }
        private byte[] receiveBuffer { get; set; } = new byte[GatePacketParser.BufferLength];
        private static ConcurrentQueue<AutoResetEvent> autoResetQueue { get; } = new ConcurrentQueue<AutoResetEvent>();
        private ArrayBuffer sendBuffer { get; } = new ArrayBuffer();

        private volatile bool isStartSend = false;
        private object locker = new object();

        public GateTcpChannel(Socket socket, GateBaseService netService, int channelId)
            : base(netService, channelId)
        {
            this.inputArgs = new SocketAsyncEventArgs();
            this.outputArgs = new SocketAsyncEventArgs();

            this.inputArgs.Completed += OnComplete;
            this.outputArgs.Completed += OnComplete;

            this.NetSocket = socket;
            this.LocalEndPoint = socket.LocalEndPoint;
            this.RemoteEndPoint = socket.RemoteEndPoint;
            this.Read();
        }

        public GateTcpChannel(GateBaseService netService, int channelId) : base(netService, channelId)
        {

        }

        public override bool Connected
        {
            get
            {
                if (this.NetSocket == null)
                    return false;

                return this.NetSocket.Connected;
            }
        }

        public override bool Connect(int timeoutMillisecond)
        {
            if (!autoResetQueue.TryDequeue(out AutoResetEvent autoReset))
            {
                autoReset = new AutoResetEvent(false);
            }

            try
            {
                if (this.Connected)
                    return true;

                if (this.NetSocket == null)
                {
                    this.NetSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)
                    {
                        SendTimeout = 1000 * 120,
                        NoDelay = true
                    };

                    this.inputArgs = new SocketAsyncEventArgs();
                    this.outputArgs = new SocketAsyncEventArgs();

                    this.inputArgs.Completed += OnComplete;
                    this.outputArgs.Completed += OnComplete;
                }

                this.outputArgs.UserToken = autoReset;
                this.outputArgs.RemoteEndPoint = this.Network.ConnectEndPoint;
                if (!this.NetSocket.ConnectAsync(this.outputArgs))
                {
                    ProcessConnectCompleted(this.outputArgs);
                }
                else
                {
                    autoReset.WaitOne(timeoutMillisecond);
                }
            }
            catch (Exception ex)
            {
                this.NetService.ProcessSocketError(this.ChannelContext, ex);
            }

            if (!this.Connected)
                this.ProcessAutoReconnecting(false);

            return this.Connected;
        }

        public override async Task<bool> ConnectAsync(CancellationToken cancellationToken)
        {
            try
            {
                if (this.Connected)
                    return true;

                if (this.NetSocket == null)
                {
                    this.NetSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp)
                    {
                        SendTimeout = 1000 * 120,
                        NoDelay = true
                    };

                    this.inputArgs = new SocketAsyncEventArgs();
                    this.outputArgs = new SocketAsyncEventArgs();

                    this.inputArgs.Completed += OnComplete;
                    this.outputArgs.Completed += OnComplete;
                }

                var tcs = new TaskCompletionSource<bool>();
                if (CancellationToken.None != cancellationToken)
                {
                    cancellationToken.Register(() =>
                    {
                        tcs.TrySetResult(false);
                    });
                }
                this.outputArgs.UserToken = tcs;
                this.outputArgs.RemoteEndPoint = this.Network.ConnectEndPoint;
                if (!this.NetSocket.ConnectAsync(this.outputArgs))
                {
                    ProcessConnectCompleted(this.outputArgs);
                }
                await tcs.Task;
            }
            catch (Exception ex)
            {
                this.NetService.ProcessSocketError(this.ChannelContext, ex);
            }

            if (!this.Connected)
                this.ProcessAutoReconnecting(false);

            return this.Connected;
        }

        public override void Write(byte[] bytes, int count)
        {
            if (bytes == null)
                return;

            if (count < GatePacketParser.HeadSize)
                throw new SocketException((int)SocketError.InvalidArgument);

            sendBuffer.Write(bytes, 0, count);

            //如果发送队列大于32立即发送数据
            if (sendBuffer.QueueCount > 32)
                StartSend();
        }

        private bool IsCanSend()
        {
            lock (this.locker)
            {
                if (this.isStartSend)
                    return false;

                this.isStartSend = true;
                return true;
            }
        }

        private void SetCanSend()
        {
            lock (this.locker)
            {
                if (this.isStartSend)
                    this.isStartSend = false;
            }
        }

        public override void StartSend()
        {
            if (!this.Connected)
                return;

            if (this.IsCanSend())
                return;

            try
            {
                var bytes = sendBuffer.Get(out int length);
                if(length == 0)
                {
                    this.SetCanSend();
                    return;
                }

                this.outputArgs.SetBuffer(bytes, 0, length);
                if (!this.NetSocket.SendAsync(this.outputArgs))
                {
                    this.ProcessSendCompleted(this.outputArgs);
                    return;
                }
            }
            catch (Exception ex)
            {
                this.isStartSend = false;
                this.NetService.ProcessSocketError(this.ChannelContext, ex);
                this.ProcessAutoReconnecting();
            }
        }

        public override void Read()
        {
            try
            {
                if (!this.Connected)
                    return;

                this.inputArgs.SetBuffer(this.receiveBuffer, 0, this.receiveBuffer.Length);
                if (!this.NetSocket.ReceiveAsync(this.inputArgs))
                    this.ProcessReceiveCompleted(this.inputArgs);

            }
            catch (Exception ex)
            {
                this.NetService.ProcessSocketError(this.ChannelContext, ex);
                this.ProcessAutoReconnecting();
            }
        }

        private void OnComplete(object sender, SocketAsyncEventArgs e)
        {
            switch (e.LastOperation)
            {
                case SocketAsyncOperation.Connect:
                    this.ProcessConnectCompleted(e);
                    break;
                case SocketAsyncOperation.Receive:
                    {
                        Action action = () => { this.ProcessReceiveCompleted(e); };
                        this.Network.Post(this.Network.ProcessContextAction, action);
                    }
                    break;
                case SocketAsyncOperation.Send:
                    {
                        Action action = () => { this.ProcessSendCompleted(e); };
                        this.Network.Post(this.Network.ProcessContextAction, action);
                    }
                    break;
                case SocketAsyncOperation.Disconnect:
                    {
                        Action action = () => { this.NetService.ProcessDisconnected(this.ChannelContext); };
                        this.Network.Post(this.Network.ProcessContextAction, action);
                        this.Disconnect();
                    }
                    break;
                default:
                    throw new Exception($"socket error: {e.LastOperation}");
            }
        }

        private void ProcessConnectCompleted(SocketAsyncEventArgs e)
        {
            try
            {
                if (e.SocketError != SocketError.Success)
                {
                    this.ProcessAutoReconnecting();
                    return;
                }

                this.LocalEndPoint = this.NetSocket.LocalEndPoint;
                this.RemoteEndPoint = this.Network.ConnectEndPoint;

                Action action = () => { this.NetService.ProcessConnected(this.ChannelContext); };
                this.Network.Post(this.Network.ProcessContextAction, action);
                if (this.NetService.ServiceType == NetServiceType.Client)
                {
                    if (e.UserToken is TaskCompletionSource<bool> tcs)
                    {
                        tcs.TrySetResult(true);
                    }
                    else if (e.UserToken is AutoResetEvent autoReset)
                    {
                        autoReset.Set();
                        autoResetQueue.Enqueue(autoReset);
                    }
                    this.Read();
                }
            }
            finally
            {
                e.UserToken = null;
            }
        }

        private void ProcessSendCompleted(SocketAsyncEventArgs e)
        {
            try
            {
                if (this.NetSocket == null)
                    return;

                if (e.SocketError != SocketError.Success)
                {
                    this.ProcessAutoReconnecting();
                    return;
                }

                if (e.BytesTransferred == 0)
                {
                    this.ProcessAutoReconnecting();
                    return;
                }
            }
            finally
            {
                this.SetCanSend();
            }
            this.StartSend();
        }

        private void ProcessReceiveCompleted(SocketAsyncEventArgs e)
        {
            if (e.SocketError != SocketError.Success)
            {
                this.ProcessAutoReconnecting();
                return;
            }

            if (e.BytesTransferred <= 0)
            {
                this.ProcessAutoReconnecting();
                return;
            }

            this.LastReceiveMillisecond = GateTimeUtils.NowMilliSecond();
            int count = e.BytesTransferred;
            int offset = 0;
            while (true)
            {
                if (count == 0)
                    break;

                try
                {
                    bool parseOk = this.Parser.Parse(this.receiveBuffer, ref offset, ref count);
                    if (parseOk)
                    {
                        GateTransferContext context = this.Parser.GetTransferContext(this.ChannelContext);
                        if (this.ServiceType == NetServiceType.Server)
                        {
                            try
                            {
                                IGateMessageAdapter adapter = this.Network.GetAdapter(context.Command);
                                adapter?.DispatchAdapter(context);
                            }
                            catch (Exception ex)
                            {
                                this.NetService.ProcessSocketError(this.ChannelContext, ex);
                            }
                        }
                        else
                        {
                            if (context.RpcId > 0)
                            {
                                if (this.CallContextList.TryRemove(context.RpcId, out IGateCallContext callContext))
                                    callContext.SetDeserializationResult(context);
                            }
                            else
                            {
                                try
                                {
                                    IGateMessageAdapter adapter = this.Network.GetAdapter(context.Command);
                                    adapter?.DispatchAdapter(context);
                                }
                                catch (Exception ex)
                                {
                                    this.NetService.ProcessSocketError(this.ChannelContext, ex);
                                }
                            }
                        }
                        this.Parser.Flush();
                    }
                }
                catch (Exception ex)
                {
                    this.NetService.ProcessSocketError(this.ChannelContext, ex);
                    this.ProcessAutoReconnecting();
                    return;
                }
            }
            Read();
        }

        private void ProcessAutoReconnecting(bool disconnect = true)
        {
            if (disconnect)
                this.Disconnect();

            if (this.Connected)
                return;

            if (this.ServiceType == NetServiceType.Client)
            {
                if (this.Network.AutoReconnecting)
                    this.Reconnect();
            }
        }

        public override void Disconnect()
        {
            if (this.NetSocket == null)
                return;

            this.Parser.Flush();
            foreach (var callContext in this.CallContextList.Values)
                callContext.SetDefaultResult();

            this.CallContextList.Clear();
            this.NetService.ProcessDisconnected(this.ChannelContext);

            this.Dispose();
        }

        public override void Dispose()
        {
            this.NetSocket.Close();
            this.NetSocket.Dispose();
            this.NetSocket = null;

            this.inputArgs.Dispose();
            this.outputArgs.Dispose();
            this.inputArgs = null;
            this.outputArgs = null;

            this.sendBuffer.Dispose();
        }
    }
}
